package org.apache.flink.cdc.connectors.gaussdb.source.config;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class ReplicationConfig implements Serializable {
    private static final long serialVersionUID = 1L;

    //此处配置数据库IP以及端口，这里的端口为haPort，通常默认是所连接DN的port+1端口
    //jdbc:gaussdb://*.*.*.*:8001/database
    private String sourceURL;

    // 复制槽的名称:replication_slot
    private String slotName;
    private Properties properties;
    private Map<String, Object> slotOptions;

    public ReplicationConfig(String sourceURL, String slotName, Properties properties, Map<String, Object> slotOptions) {
        this.sourceURL = sourceURL;
        this.slotName = slotName;
        this.properties = properties;
        this.slotOptions = slotOptions;
    }

    public String getSourceURL() {
        return sourceURL;
    }

    public String getSlotName() {
        return slotName;
    }

    public Properties getProperties() {
        return properties;
    }

    public Map<String, Object> getSlotOptions() {
        return slotOptions;
    }

    public static ReplicationConfig fromProperties(Properties configProperties) {
//        String sourceURL = configProperties.getProperty("source_url");
        String sourceURL = configProperties.getProperty("source_url");
        String slotName = configProperties.getProperty("slot_name");

        Properties properties = new Properties();
        properties.setProperty("user", configProperties.getProperty("gaussdb_user"));
        properties.setProperty("password", configProperties.getProperty("gaussdb_passwd"));
        properties.setProperty("assumeMinServerVersion", "9.4");
        properties.setProperty("replication", "database");
        properties.setProperty("preferQueryMode", "simple");

        Map<String, Object> slotOptions = new HashMap<>();
        slotOptions.put("include-xids", Boolean.parseBoolean(configProperties.getProperty("include_xids", "true")));
        slotOptions.put("skip-empty-xacts", Boolean.parseBoolean(configProperties.getProperty("skip_empty_xacts", "true")));
        slotOptions.put("parallel-decode-num", Integer.parseInt(configProperties.getProperty("parallel_decode_num", "2")));
        slotOptions.put("white-table-list", configProperties.getProperty("white_table_list", "public.*"));
        slotOptions.put("standby-connection", Boolean.parseBoolean(configProperties.getProperty("standby_connection", "false")));
        slotOptions.put("decode-style", configProperties.getProperty("decode_style", "j"));
        slotOptions.put("sending-batch", Integer.parseInt(configProperties.getProperty("sending_batch", "0")));
        slotOptions.put("max-reorderbuffer-in-memory", Integer.parseInt(configProperties.getProperty("max_reorderbuffer_in_memory", "2")));
        slotOptions.put("include-user", Boolean.parseBoolean(configProperties.getProperty("include_user", "false")));
        slotOptions.put("enable-heartbeat", Boolean.parseBoolean(configProperties.getProperty("enable_heartbeat", "false")));

        return new ReplicationConfig(sourceURL, slotName, properties, slotOptions);
    }
}
